package com.zwcl.common.core.utils.thread;


import java.util.Date;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
 * 多线程分批次执行任务，最后需要一致完成后返回
 * 该类有两个问题：
 * 1. 如果数据量巨大，比如1000万，将导致开启的线程数过多
 * 2. 如果开固定线程数，如5，将每5个执行完，再执行下5个，这样不好
 * Created by Administrator on 2018/6/7.
 */
public abstract class ThreadDoJob<T> {
    //开启的线程的数量
    private final static int THREAD_MAX = 5;
    //单次执行的条数
    private final static int RUN_NUM=1000;
    //任务列表
    private List<T> JobList;
    //任务分片数
    private volatile int blockNum=0;
    private volatile RunResult runResult;
    //是否全部完成
    public volatile boolean finishFlag=false;
    //是否还有数据
    public volatile boolean haveDataFlag=true;
    //线程已经取到了哪个分片
    private volatile int curBlockNum=0;
    //线程已经执行完成的数量
    private int finishBlockNum=0;

    //抽象方法，被具体实现类集成实现
    protected abstract Boolean doBlockJob(List<T> blockJobList);

    //抽象方法，任务完成后回调调用方
    protected abstract void taskFinishCallBack();

    //构造初始化函数
    public ThreadDoJob(List<T> jobList){
        JobList=jobList;
        //计算分片数
        int count=JobList.size();
        int blocks=count/RUN_NUM;
        blockNum=(count%RUN_NUM==0)?blocks:(blocks+1);
        runResult=new RunResult();
        runResult.successFlag=true;
        runResult.successNum=0;
        System.out.println(new Date() +":分为"+blockNum+ "块执行");
    }

    //开始执行任务
    public void startJob()
    {
        Integer threadNum=blockNum>THREAD_MAX ? blockNum:THREAD_MAX;
        CyclicBarrier lock = new CyclicBarrier(threadNum,new BarrierRun());
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < threadNum; i++) {
            exec.submit(new CountdownLatchTask(lock,i));
        }
        exec.shutdown();
    }

    //返回操作成功的标志
    public boolean getFinishFlag(){
        return this.finishFlag;
    }

    //返回执行结果
    public RunResult getRunResult() {return this.runResult;}

    //获取任务块元数据（分页），取的时候要加锁
    public List<T> getBlockJobList(Integer i)
    {
        //同步加锁，防止当前块移动
        synchronized (this){
            System.out.println(new Date()+":线程"+i+" 获取数据块"+curBlockNum);
            int startIndex=curBlockNum*RUN_NUM;
            int endIndex=(curBlockNum+1)*RUN_NUM;
            if(JobList.size()<=startIndex)     //取完了
            {
                haveDataFlag=false;
                System.out.println(new Date()+":线程"+i+" 没有数据了");
                return null;
            }
            //有可能超越尾部
            List<T> curJobList= JobList.parallelStream().skip(startIndex).limit(RUN_NUM).collect(Collectors.toList());
            System.out.println(new Date()+":线程"+i+" 获取到了"+curJobList.size()+"条数据");
            curBlockNum++;
            return curJobList;
        }
    }

    private void updateFinishResult(Integer i,boolean succFlag,int succNum)
    {
        synchronized (this)
        {
            finishBlockNum=finishBlockNum+1;
            runResult.successFlag=runResult.successFlag&&succFlag;
            runResult.successNum=runResult.successNum+(succFlag?succNum:0);
            System.out.println(new Date()+":线程"+i+" 已经完成了"+finishBlockNum+"块");
        }
    }

    //内部类，实现线程工作
    class CountdownLatchTask implements Runnable{
        //屏障锁
        private final CyclicBarrier lock;
        //线程名称
        private final String threadName;
        //线程号
        private final int threadNo;

        CountdownLatchTask(CyclicBarrier lock, Integer threadNo) {
            this.lock = lock;
            this.threadName = "Thread-"+threadNo.toString();
            this.threadNo=threadNo;
        }
        //线程执行任务
        @Override
        public void run() {
            try {
                boolean continueFlag=true;
                while (continueFlag && haveDataFlag ){           //&& haveDataFlag
                    //获取当前线程需要操作的实体对象
                    List<T> curJobList=getBlockJobList(threadNo);   //取数据
                    if(curJobList!=null && curJobList.size()>0) {
                        System.out.println(new Date() + ":" + threadName + "数据准备完成,开始：" + curJobList.get(0).toString() + "总条数：" + curJobList.size());
                        //根据任务列表完成任务
                        boolean curSuccessFlag= doBlockJob(curJobList);
                        System.out.println(new Date() + ":" + threadName + "数据执行完成," + curJobList.get(0).toString());
                        updateFinishResult(threadNo,curSuccessFlag,curJobList.size());
                    }
                    else{
                        continueFlag=false;
                        System.out.println(new Date() + ":" + threadName + "不用继续执行了");
                    }
                    lock.await();
                }
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    class BarrierRun implements Runnable {
        @Override
        public void run() {
            System.out.println(new Date()+": 检验屏障参数值"+finishBlockNum);
            if(finishBlockNum==blockNum) {
                finishFlag = true;
                //有可能执行多次
                taskFinishCallBack();
            }
        }
    }

    class RunResult{
        //成功条数
        public long successNum=(long)0;
        //所有任务最终执行的结果
        public boolean successFlag=true;
    }

//    private Map<String, String> threadGetDomainContent(Map<String,String> urlMap){
//        Map<String,String> result=new HashMap<>();
//        //进行异步任务列表
//        List<FutureTask<Map<String,String> >> futureTasks = new ArrayList<FutureTask<Map<String,String> >>();
//        //线程池 初始化十个线程 和JDBC连接池是一个意思 实现重用
//        ExecutorService executorService = Executors.newFixedThreadPool(5);
//        long start = System.currentTimeMillis();
//        //类似与run方法的实现 Callable是一个接口，在call中手写逻辑代码
//        urlMap.entrySet().forEach(item->{
//            DomainTask task=new DomainTask();
//            task.setDomain(item.getKey());
//            task.setUrl(item.getValue());
//            task.setRestTemplate(this.restTemplate);
//            FutureTask<Map<String,String> > futureTask = new FutureTask<Map<String,String> >(task);
//            futureTasks.add(futureTask);
//            //提交异步任务到线程池，让线程池管理任务 特爽把。
//            //由于是异步并行任务，所以这里并不会阻塞
//            executorService.submit(futureTask);
//        });
//        for (FutureTask<Map<String,String> > futureTask : futureTasks) {
//            //futureTask.get() 得到我们想要的结果
//            //该方法有一个重载get(long timeout, TimeUnit unit) 第一个参数为最大等待时间，第二个为时间的单位
//            try {
//                // 阻塞，等待异步任务执行完毕-获取异步任务的返回值
//                Map<String,String>  content = futureTask.get();
//                result.putAll(content);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            } catch (ExecutionException e) {
//                e.printStackTrace();
//            }
//        }
//
//        //清理线程池
//        executorService.shutdown();
//        return result;
//    }
}
